// Copyright (c) 2023 The TQUIC Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! A QUIC endpoint.

#![allow(dead_code)]
#![allow(unused_variables)]

use std::cell::RefCell;
use std::cmp;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::Duration;
use std::time::Instant;

use bytes::Bytes;
use bytes::BytesMut;
use log::*;
use rand::Rng;
use ring::hmac;
use rustc_hash::FxHashMap;
use rustc_hash::FxHashSet;
use slab::Slab;

use crate::connection::Connection;
use crate::error::Error;
use crate::packet;
use crate::packet::PacketHeader;
use crate::packet::PacketType;
use crate::timer_queue::TimerQueue;
use crate::token::AddressToken;
use crate::token::AddressTokenType::*;
use crate::token::ResetToken;
use crate::ConnectionId;
use crate::ConnectionIdGenerator;
use crate::ConnectionQueues;
use crate::Event;
use crate::FourTuple;
use crate::PacketInfo;
use crate::PacketSendHandler;
use crate::Result;
use crate::TransportHandler;

/// Endpoint is an entity that can participate in a QUIC connection by
/// generating, receiving, and processing QUIC packets.
///
/// There are two types of endpoints in QUIC: client and server. Endpoint may
/// maintain one or more QUIC connections.  Endpoint provides a high level API
/// to use the QUIC library.
pub struct Endpoint {
    /// Whether this is a server endpoint.
    is_server: bool,

    /// QUIC Configuration
    config: Box<crate::Config>,

    /// All connections on the endpoint.
    conns: ConnectionTable,

    /// Used for matching incoming packets to connections.
    routes: ConnectionRoutes,

    /// Connections ordered by expiration time.
    timers: TimerQueue,

    /// Various connection queues.
    queues: Rc<RefCell<ConnectionQueues>>,

    /// Connection ID Generator.
    cid_gen: Box<dyn ConnectionIdGenerator>,

    /// Used to communicate with the application code.
    handler: Box<dyn TransportHandler>,

    /// Used to send packet out.
    sender: Rc<dyn PacketSendHandler>,

    /// Packets generated by the endpoint.
    packets: PacketQueue,

    /// The endpoint is shutdown.
    closed: bool,

    /// The unique trace id for the enpdoint
    trace_id: String,
}

impl Endpoint {
    /// Create a QUIC endpoint.
    pub fn new(
        config: Box<crate::Config>,
        is_server: bool,
        handler: Box<dyn TransportHandler>,
        sender: Rc<dyn PacketSendHandler>,
    ) -> Self {
        let cid_gen = Box::new(crate::RandomConnectionIdGenerator {
            cid_len: config.cid_len,
            cid_lifetime: None,
        });
        let trace_id = if is_server { "SERVER" } else { "CLIENT" };
        let packets = PacketQueue::new(config.send_batch_size);

        Self {
            is_server,
            config,
            conns: ConnectionTable::new(),
            routes: ConnectionRoutes::new(),
            timers: TimerQueue::new(),
            queues: Rc::new(RefCell::new(ConnectionQueues::new())),
            cid_gen,
            handler,
            sender,
            packets,
            closed: false,
            trace_id: trace_id.to_string(),
        }
    }

    /// Create a client connection.
    ///
    /// The caller should call `process_connections()` after one or more connect().
    pub fn connect(
        &mut self,
        local: SocketAddr,
        remote: SocketAddr,
        server_name: Option<&str>,
        session: Option<&[u8]>,
        token: Option<&[u8]>,
    ) -> Result<u64> {
        if self.is_server {
            return Err(Error::InvalidOperation("not a client".into()));
        }
        if self.closed {
            return Err(Error::InvalidOperation("already closed".into()));
        }

        // Create a client connection.
        let scid = self.cid_gen.generate();
        let conn = Connection::new_client(&scid, local, remote, server_name, &mut self.config)?;
        let idx = self.conns.insert(conn);
        if let Some(conn) = self.conns.get_mut(idx) {
            conn.set_index(idx);
            conn.set_queues(self.queues.clone());
            if let Some(session) = session {
                conn.set_session(session)?;
            }
            if let Some(token) = token {
                conn.set_token(token.to_vec())?;
            }
            conn.start_handshake()?;

            self.handler.on_conn_created(conn);
            conn.mark_tickable(true);
        }

        // Update the connection route.
        if scid.len() > 0 {
            self.routes.insert_with_cid(scid, idx);
        } else {
            self.routes
                .insert_with_addr(FourTuple { local, remote }, idx);
        }

        Ok(idx)
    }

    /// Process an incoming UDP datagram.
    ///
    /// Incoming packets are classified on receipt. Packets can either be
    /// associated with an existing connection or for servers potentially create
    /// a new connection.
    /// See RFC 9000 Section 5.2 Matching Packets to Connections.
    pub fn recv(&mut self, buf: &mut [u8], info: &PacketInfo) -> Result<()> {
        trace!(
            "{} recv packet {} bytes {:?}",
            &self.trace_id,
            buf.len(),
            info
        );

        let cid_len = self.cid_gen.cid_len();
        let (mut hdr, _) = PacketHeader::from_bytes(buf, cid_len)?;
        let (local, remote) = (info.dst, info.src);

        // Delivery the datagram to the target connection.
        // TODO: support stateless reset
        if let Some(c) = self.routes.find(&hdr.dcid, buf, info) {
            if let Some(conn) = self.conns.get_mut(*c) {
                conn.mark_tickable(true);
                conn.recv(buf, info).map(|_| ())?;
                return Ok(());
            }
        }

        // Drop the datagram for unrecognized connection for client
        if !self.is_server && self.config.stateless_reset {
            self.send_stateless_reset(buf.len(), &hdr.dcid, local, remote)?;
            return Ok(());
        }

        // Try to create a new connection for server
        if hdr.pkt_type == PacketType::Initial && !self.closed {
            // Check max concurrent connections limit
            if self.conns.len() >= self.config.max_concurrent_conns as usize {
                return Ok(());
            }

            // Validate version of the packet
            if !crate::version_is_supported(hdr.version) {
                return self.send_version_negotiation(&hdr, local, remote);
            }

            // Validate token of the Initial packet
            let token = if let Some(ref mut token) = hdr.token {
                match self.validate_address_token(token, &remote, &hdr.dcid) {
                    Ok(token) => Some(token),
                    Err(_) => match AddressToken::token_type(token) {
                        // In response to processing an Initial packet containing
                        // a token that was provided in a Retry packet, a server
                        // cannot send another Retry packet; it can only refuse
                        // the connection or permit it to proceed.
                        Ok(RetryToken) => return Err(Error::InvalidToken),
                        _ => return self.send_retry(&hdr, local, remote),
                    },
                }
            } else if self.config.retry {
                return self.send_retry(&hdr, local, remote);
            } else {
                None
            };

            let odcid = match token {
                Some(ref token) => token.odcid.unwrap(), // always success
                None => hdr.dcid,
            };

            // Create a server connection
            let scid = self.cid_gen.generate();
            let conn =
                Connection::new_server(&scid, local, remote, token.as_ref(), &mut self.config)?;
            let idx = self.conns.insert(conn);
            if cid_len > 0 {
                self.routes.insert_with_cid(scid, idx);
                self.routes.insert_with_cid(odcid, idx);
            } else {
                self.routes
                    .insert_with_addr(FourTuple { local, remote }, idx);
            }

            // Delivery the datagram to the created connection.
            if let Some(conn) = self.conns.get_mut(idx) {
                conn.set_index(idx);
                conn.set_queues(self.queues.clone());
                trace!(
                    "{} create a server connection {:?}",
                    conn.trace_id(),
                    &self.trace_id
                );

                self.handler.on_conn_created(conn);
                conn.mark_tickable(true);
                conn.recv(buf, info).map(|_| ())?;
            }
            return Ok(());
        }

        // Send the Stateless Reset packet for the unknown connection
        if hdr.pkt_type == PacketType::OneRTT && !hdr.dcid.is_empty() && self.config.stateless_reset
        {
            trace!(
                "endpoint send stateless retry: remote {:?} local {:?}",
                remote,
                local
            );
            self.send_stateless_reset(buf.len(), &hdr.dcid, local, remote)?;
            return Ok(());
        }

        // Ignore non-initial packet for unknown connection
        Ok(())
    }

    /// Decode and validate the address token.
    fn validate_address_token(
        &mut self,
        addr_token: &mut [u8],
        cli_addr: &SocketAddr,
        cli_pkt_dcid: &ConnectionId,
    ) -> Result<AddressToken> {
        let lifetime = self.config.address_token_lifetime;

        for key in &self.config.address_token_key {
            match AddressToken::decode(key, addr_token, cli_addr, cli_pkt_dcid, lifetime) {
                Ok(token) => return Ok(token),
                Err(Error::ExpiredToken) => return Err(Error::ExpiredToken),
                _ => continue, // try the next key
            }
        }
        Err(Error::InvalidToken)
    }

    /// Write an Version Negoiation packet which will be sent later.
    fn send_version_negotiation(
        &mut self,
        cli_pkt_hdr: &PacketHeader,
        local: SocketAddr,
        remote: SocketAddr,
    ) -> Result<()> {
        let mut pkt_out = self.packets.get_buffer();
        let len =
            packet::version_negotiation(&cli_pkt_hdr.dcid, &cli_pkt_hdr.scid, &mut pkt_out[..])?;
        pkt_out.truncate(len);

        let pkt_info = PacketInfo {
            src: local,
            dst: remote,
            time: Instant::now(),
        };

        trace!(
            "{} send version negotiation: remote {:?} local {:?}",
            &self.trace_id,
            remote,
            local
        );
        self.packets.add_packet(pkt_out, pkt_info);
        Ok(())
    }

    /// Write an Retry packet which will be sent later.
    fn send_retry(
        &mut self,
        initial_pkt_hdr: &PacketHeader,
        local: SocketAddr,
        remote: SocketAddr,
    ) -> Result<()> {
        let mut pkt_out = self.packets.get_buffer();

        // Generate a retry token
        let rscid = self.cid_gen.generate();
        let token = AddressToken::new_retry_token(remote, initial_pkt_hdr.dcid, rscid);
        let token = token.encode(&self.config.address_token_key[0])?;

        // Write a Retry packet
        let len = packet::retry(
            &rscid,                // server cid
            &initial_pkt_hdr.scid, // client cid
            &initial_pkt_hdr.dcid, // original dcid
            &token,
            crate::QUIC_VERSION_V1,
            &mut pkt_out[..],
        )?;
        pkt_out.truncate(len);

        let pkt_info = PacketInfo {
            src: local,
            dst: remote,
            time: Instant::now(),
        };

        trace!(
            "{} send retry: remote {:?} local {:?}",
            &self.trace_id,
            remote,
            local
        );
        self.packets.add_packet(pkt_out, pkt_info);
        Ok(())
    }

    /// Write an Stateless Reset packet which will be sent later.
    /// TODO: limit based on address
    fn send_stateless_reset(
        &mut self,
        pkt_in_len: usize,
        dcid: &ConnectionId,
        local: SocketAddr,
        remote: SocketAddr,
    ) -> Result<()> {
        // Select padding length
        const MIN_PADDING_LEN: usize = 5;
        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + crate::MAX_CID_LEN;
        let padding_len = {
            let max_padding_len = match pkt_in_len.checked_sub(crate::RESET_TOKEN_LEN) {
                Some(v) if v > MIN_PADDING_LEN => v - 1,
                _ => return Ok(()),
            };
            if max_padding_len <= IDEAL_MIN_PADDING_LEN {
                max_padding_len
            } else {
                rand::thread_rng().gen_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
            }
        };

        // Generate stateless reset token based on the dcid.
        let key = &self.config.reset_token_key;
        let reset_token = ResetToken::generate(key, dcid);

        // Write a Stateless Reset packet.
        let mut pkt_out = self.packets.get_buffer();
        let len = packet::stateless_reset(padding_len, &reset_token, &mut pkt_out[..])?;
        pkt_out.truncate(len);

        let pkt_info = PacketInfo {
            src: local,
            dst: remote,
            time: Instant::now(),
        };

        trace!("{} send stateless reset {:?}", &self.trace_id, pkt_info);
        self.packets.add_packet(pkt_out, pkt_info);
        Ok(())
    }

    /// Return the amount of time until the next timeout event.
    pub fn timeout(&self) -> Option<Duration> {
        // There are still events pending
        let queues = self.queues.borrow();
        if !self.packets.is_empty() || !queues.is_empty() {
            return Some(crate::TIMER_GRANULARITY);
        }

        self.timers.time_remaining(Instant::now())
    }

    /// Process timeout events on the endpoint.
    pub fn on_timeout(&mut self, now: Instant) {
        while let Some(idx) = self.timers.next_expire(now) {
            if let Some(conn) = self.conns.get_mut(idx) {
                trace!(
                    "{} process timeout event, connection index {} trace id {}",
                    self.trace_id,
                    idx,
                    conn.trace_id(),
                );
                conn.mark_tickable(true);
                conn.on_timeout(now);
            }
        }
    }

    /// Process internal events of all tickable connections.
    pub fn process_connections(&mut self) -> Result<()> {
        trace!(
            "{} process {} tickable connections",
            &self.trace_id,
            self.conn_tickable_len()
        );
        let mut ready = Vec::<u64>::new();

        // Process all tickable connections
        while let Some(idx) = self.conn_tickable_next() {
            let conn = match self.conns.get_mut(idx) {
                Some(v) => v,
                None => continue,
            };

            // Try to clean up the closed connection.
            if conn.is_closed() {
                self.handler.on_conn_closed(conn);

                conn.mark_tickable(false);
                conn.mark_sendable(false);
                self.timers.del(&idx);
                self.routes.remove(conn);
                self.conns.remove(idx);
                continue;
            }

            // Try to process endpoint-facing events on the connection.
            while let Some(event) = conn.poll() {
                match event {
                    Event::ConnectionEstablished => self.handler.on_conn_established(conn),

                    Event::NewToken(token) => self.handler.on_new_token(conn, token),

                    Event::ScidToAdvertise(num) => {
                        let key = &self.config.reset_token_key;
                        Self::conn_add_scids(conn, num, &mut self.cid_gen, key, &mut self.routes);
                    }

                    Event::ScidRetired(cid) => self.routes.remove_with_cid(&cid),

                    Event::DcidAdvertised(token) => self.routes.insert_with_token(token, idx),

                    Event::DcidRetired(token) => self.routes.remove_with_token(&token),

                    Event::StreamCreated(stream_id) => {
                        self.handler.on_stream_created(conn, stream_id)
                    }

                    Event::StreamClosed(stream_id) => {
                        self.handler.on_stream_closed(conn, stream_id);
                        conn.stream_destroy(stream_id);
                    }
                }
            }
            for stream_id in conn.stream_readable_iter() {
                if conn.stream_check_readable(stream_id) {
                    self.handler.on_stream_readable(conn, stream_id);
                }
            }

            for stream_id in conn.stream_writable_iter() {
                if conn.stream_check_writable(stream_id) {
                    self.handler.on_stream_writable(conn, stream_id);
                }
            }

            // Add the connection to the sendable queue
            conn.mark_sendable(true);

            // Try to update the timer of the connection
            if let Some(t) = conn.timeout() {
                self.timers.add(idx, t, Instant::now());
            } else {
                self.timers.del(&idx);
            }

            if conn.is_ready() {
                trace!("conn {} is still ready", conn.trace_id());
                ready.push(idx);
            }
            conn.mark_tickable(false);
        }

        // Process all sendable connections
        self.send_packets_out()?;

        // Add connection with internal events to the tickable queue again.
        for idx in ready {
            if let Some(conn) = self.conns.get_mut(idx) {
                conn.mark_tickable(true);
            }
        }

        Ok(())
    }

    /// Add scids for the given connection
    fn conn_add_scids(
        conn: &mut Connection,
        num: u8,
        gen: &mut Box<dyn ConnectionIdGenerator>,
        token_key: &hmac::Key,
        routes: &mut ConnectionRoutes,
    ) {
        for i in 0..num {
            let (scid, reset_token) = gen.generate_cid_and_token(token_key);
            match conn.add_scid(scid, reset_token, true) {
                Ok(_) => (),
                Err(e) => {
                    warn!("add scid to {:?} : {:?}", conn.trace_id(), e);
                    break;
                }
            }

            let index = conn.index().unwrap();
            routes.insert_with_cid(scid, index);
        }
    }

    /// Check whether the given connection exists.
    pub(crate) fn conn_exist(&self, cid: ConnectionId) -> bool {
        self.routes.cid_table.get(&cid).is_some()
    }

    /// Get the connection by index
    pub fn conn_get_mut(&mut self, index: u64) -> Option<&mut Connection> {
        match self.conns.get_mut(index) {
            Some(v) => Some(v), // `Box<&mut Connection>` is convert to `&mut Connection`
            None => None,
        }
    }

    /// Return the index of a tickable connection
    fn conn_tickable_next(&mut self) -> Option<u64> {
        let queues = self.queues.borrow_mut();
        queues.tickable_next()
    }

    /// Return the number of tickable connections
    fn conn_tickable_len(&self) -> usize {
        let queues = self.queues.borrow();
        queues.tickable.len()
    }

    /// Return the index of a sendable connection
    fn conn_sendable_next(&mut self) -> Option<u64> {
        let queues = self.queues.borrow_mut();
        queues.sendable_next()
    }

    /// Return the number of sendble connections
    fn conn_sendable_len(&self) -> usize {
        let queues = self.queues.borrow();
        queues.sendable.len()
    }

    /// Send the QUIC packets out to the peer.
    fn send_packets_out(&mut self) -> Result<()> {
        trace!(
            "{} process {} sendable connections",
            &self.trace_id,
            self.conn_sendable_len()
        );
        let mut sent = FxHashSet::default();
        let mut total = 0;

        while self.conn_sendable_len() > 0 {
            // Iterate over connections that have packets to send.
            while let Some(idx) = self.conn_sendable_next() {
                if let Some(conn) = self.conns.get_mut(idx) {
                    if conn.is_draining() || conn.is_closed() {
                        conn.mark_sendable(false);
                        continue;
                    }

                    let mut buf = self.packets.get_buffer();
                    match conn.send(&mut buf) {
                        Ok((len, info)) => {
                            buf.truncate(len);
                            self.packets.add_packet(buf, info);
                            sent.insert(idx);
                        }
                        Err(Error::Done) => {
                            self.packets.put_buffer(buf);
                            // When a connection runs out of packets to send,
                            // it is removed from the sendable queue.
                            conn.mark_sendable(false);
                        }
                        Err(e) => {
                            self.packets.put_buffer(buf);
                            error!("{} generate packet err: {:?}", &self.trace_id, e);
                            return Err(e);
                        }
                    };
                }

                if self.packets.has_full_batch() {
                    let batch = self.packets.next_batch();
                    let done = match self.sender.on_packets_send(batch) {
                        Ok(v) => v,
                        Err(e) => {
                            error!("{} send packet err: {:?}", &self.trace_id, e);
                            return Err(e);
                        }
                    };
                    self.packets.drain_front(done);
                    total += done;
                };
            }
        }

        // Try to update timers
        for idx in &sent {
            if let Some(conn) = self.conns.get_mut(*idx) {
                if let Some(t) = conn.timeout() {
                    self.timers.add(*idx, t, Instant::now());
                } else {
                    self.timers.del(idx);
                }
            }
        }

        // Try to send the remaining packets
        let mut batch = self.packets.next_batch();
        while !batch.is_empty() {
            trace!(
                "{} try to send remaining packets in packet queue",
                &self.trace_id,
            );

            let done = match self.sender.on_packets_send(batch) {
                Ok(v) => v,
                Err(e) => return Err(e),
            };
            if done == 0 {
                break;
            }
            total += done;
            self.packets.drain_front(done);
            batch = self.packets.next_batch();
        }

        trace!("{} send total {} packets out", &self.trace_id, total);
        Ok(())
    }

    /// Cease creating new connections and wait all active connections to
    /// close.
    pub fn close(&mut self) {
        self.closed = true;
    }

    /// Set the unique trace id for the endpoint
    pub fn set_trace_id(&mut self, trace_id: String) {
        self.trace_id = trace_id
    }

    /// Return the unique trace id for the endpoint
    pub fn trace_id(&self) -> &str {
        &self.trace_id
    }
}

/// ConnectionTable is used for storing QUIC connections.
/// It provide pointer stability (the address of connections stored in the map
/// does not change), which makes the FFI API more easier to use.
struct ConnectionTable {
    conns: FxHashMap<u64, Box<Connection>>,
    next_index: u64,
}

impl ConnectionTable {
    fn new() -> Self {
        Self {
            conns: FxHashMap::default(),
            next_index: 0,
        }
    }

    /// Insert a QUIC connection
    fn insert(&mut self, conn: Connection) -> u64 {
        let index = self.next_index;
        self.next_index += 1;

        self.conns.insert(index, Box::new(conn));
        index
    }

    /// Get a QUIC connection by index
    fn get_mut(&mut self, index: u64) -> Option<&mut Box<Connection>> {
        self.conns.get_mut(&index)
    }

    /// Remove a QUIC connection by index
    fn remove(&mut self, index: u64) {
        self.conns.remove(&index);
    }

    /// Return the number of connections
    fn len(&self) -> usize {
        self.conns.len()
    }

    /// Return true if there are no connections stored in the table.
    fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

/// ConnectionRoutes is used for matching incoming packets to connections.
/// See RFC 9000 Section 5.2
struct ConnectionRoutes {
    /// Connections identified based on the locally created CID.
    cid_table: FxHashMap<ConnectionId, u64>,

    /// Connections(with zero-length CID) identified based on the address tuple.
    addr_table: HashMap<FourTuple, u64>,

    /// Connections identified based on the stateless reset token.
    token_table: FxHashMap<ResetToken, u64>,
}

impl ConnectionRoutes {
    fn new() -> Self {
        Self {
            cid_table: FxHashMap::default(),
            addr_table: HashMap::default(),
            token_table: FxHashMap::default(),
        }
    }

    /// Find the target connection for the incoming datagram.
    fn find(&self, dcid: &ConnectionId, buf: &mut [u8], info: &PacketInfo) -> Option<&u64> {
        let mut idx = if dcid.len() > 0 {
            // The packet has a non-zero-length Destination Connection ID
            // corresponding to an existing connection.
            self.cid_table.get(dcid)
        } else {
            // The Destination Connection ID is zero length and the
            // addressing information in the packet matches the addressing
            // information the endpoint uses to identify a connection with a
            // zero-length connection ID.
            let addr = FourTuple {
                local: info.dst,
                remote: info.src,
            };
            self.addr_table.get(&addr)
        };

        if idx.is_none() && buf.len() > crate::RESET_TOKEN_LEN {
            // The endpoint identifies a received datagram as a Stateless Reset
            // by comparing the last 16 bytes of the datagram with all stateless
            // reset tokens.
            let token = ResetToken::from_bytes(buf).ok()?;
            idx = self.token_table.get(&token);
        }

        idx
    }

    /// Insert the local cid and the connection.
    fn insert_with_cid(&mut self, cid: ConnectionId, idx: u64) {
        self.cid_table.insert(cid, idx);
    }

    /// Remove the entry for the given cid.
    fn remove_with_cid(&mut self, cid: &ConnectionId) {
        self.cid_table.remove(cid);
    }

    /// Insert the address tuple and the connection.
    fn insert_with_addr(&mut self, addr: FourTuple, idx: u64) {
        self.addr_table.insert(addr, idx);
    }

    /// Remove the entry for the given address tuple.
    fn remove_with_addr(&mut self, addr: &FourTuple) {
        self.addr_table.remove(addr);
    }

    /// Insert the stateless reset token and the connection.
    fn insert_with_token(&mut self, token: ResetToken, idx: u64) {
        self.token_table.insert(token, idx);
    }

    /// Remove the entry for the given reset token.
    fn remove_with_token(&mut self, token: &ResetToken) {
        self.token_table.remove(token);
    }

    /// Remove all routes for the connection
    fn remove(&mut self, conn: &Connection) {
        // Remove routes based on scid or address tuple
        if !conn.zero_length_scid() {
            for c in conn.scid_iter() {
                self.remove_with_cid(&c.cid);
            }
        } else {
            for ref p in conn.paths_iter() {
                self.remove_with_addr(p);
            }
        }

        // Remove routes based on original dcid
        if conn.is_server() {
            if let Some(odcid) = conn.odcid() {
                self.remove_with_cid(&odcid);
            }
        }

        // Remove routes based on stateless reset token
        if !conn.zero_length_dcid() {
            for c in conn.dcid_iter() {
                if let Some(token) = c.reset_token {
                    let token = ResetToken(token.to_be_bytes());
                    self.remove_with_token(&token);
                }
            }
        }
    }
}

const MAX_BUFFER_SIZE: usize = 2048;

/// PacketQueue is used for sending out packets in batches.
struct PacketQueue {
    /// Outgoing packets generated by the endpoint.
    packets: VecDeque<(Vec<u8>, PacketInfo)>,

    /// The batch size of outgoing packets.
    batch_size: usize,

    /// Send buffer pool.
    buffers: VecDeque<Vec<u8>>,
}

impl PacketQueue {
    fn new(batch_size: usize) -> Self {
        Self {
            packets: VecDeque::new(),
            batch_size,
            buffers: VecDeque::new(),
        }
    }

    /// check whether the packet queue is empty.
    fn is_empty(&self) -> bool {
        self.packets.len() == 0
    }

    /// Add a packet to queue for sending in batches.
    fn add_packet(&mut self, pkt: Vec<u8>, info: PacketInfo) {
        self.packets.push_back((pkt, info));
    }

    /// Return the next batch packets to send.
    fn next_batch(&mut self) -> &[(Vec<u8>, PacketInfo)] {
        let batch_size = cmp::min(self.batch_size, self.packets.len());
        self.packets.make_contiguous();
        let (packets, _) = self.packets.as_slices();
        &packets[..batch_size]
    }

    /// Check whether the number of packets reaches batch_size.
    fn has_full_batch(&self) -> bool {
        self.packets.len() >= self.batch_size
    }

    /// Remove the sent packets and put the used buffers to the buffer pool.
    fn drain_front(&mut self, n: usize) {
        let len = cmp::min(n, self.packets.len());
        for mut p in self.packets.drain(..len) {
            p.0.resize(MAX_BUFFER_SIZE, 0);
            self.buffers.push_back(p.0);
        }
    }

    /// Get a packet buffer from the buffer pool.
    fn get_buffer(&mut self) -> Vec<u8> {
        match self.buffers.pop_front() {
            Some(v) => v,
            None => vec![0; MAX_BUFFER_SIZE],
        }
    }

    /// Get a packet buffer from the buffer pool.
    fn put_buffer(&mut self, mut buf: Vec<u8>) {
        buf.resize(MAX_BUFFER_SIZE, 0);
        self.buffers.push_back(buf);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::connection;
    use crate::Config;
    use crate::Error;
    use crate::TlsConfig;
    use bytes::Buf;
    use connection::tests::TestPair as TestTool;
    use mio;
    use rand::prelude::SliceRandom;
    use rand::rngs::mock::StepRng;
    use rand::RngCore;
    use ring::aead;
    use ring::aead::LessSafeKey;
    use ring::aead::UnboundKey;
    use std::cmp;
    use std::net::IpAddr;
    use std::net::Ipv4Addr;
    use std::net::SocketAddr;
    use std::sync::atomic::AtomicBool;
    use std::sync::atomic::Ordering;
    use std::sync::Arc;
    use std::thread;
    use std::thread::JoinHandle;
    use std::time::Duration;

    type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

    struct TestPair {
        stop: Arc<AtomicBool>,
    }

    impl TestPair {
        fn new() -> Self {
            Self {
                stop: Arc::new(AtomicBool::new(false)),
            }
        }

        /// Run client/server endpoint in two threads.
        fn run(&mut self, cli_conf: Config, srv_conf: Config, case_conf: CaseConf) -> Result<()> {
            // Exit if client/server thread panic
            std::panic::set_hook(Box::new(|panic_info| {
                println!(
                    "unexpected panic {:?} occurred in {:?} and exit",
                    panic_info.payload().downcast_ref::<&str>(),
                    panic_info.location()
                );
                std::process::exit(-1);
            }));

            // Prepare client/server sockets and config
            let mut cli_poll = mio::Poll::new().unwrap();
            let cli_sock =
                TestSocket::new(cli_poll.registry(), &case_conf, "CLIENT".into()).unwrap();
            let cli_addr = cli_sock.local_addr()?;
            let cli_case_conf = case_conf.clone();
            let cli_stop = Arc::clone(&self.stop);

            let mut srv_poll = mio::Poll::new().unwrap();
            let srv_sock =
                TestSocket::new(srv_poll.registry(), &case_conf, "SERVER".into()).unwrap();
            let srv_addr = srv_sock.local_addr()?;
            let srv_case_conf = case_conf.clone();
            let srv_stop = Arc::clone(&self.stop);

            let host = Some("example.org");
            let session = cli_case_conf.session.clone();
            let token = cli_case_conf.token.clone();

            // Create and run client endpoint in child thread
            let client = thread::spawn(move || {
                let session = session.as_ref().map(Vec::as_ref);
                let token = token.as_ref().map(Vec::as_ref);

                let cli_hdl = Box::new(ClientHandler::new(cli_case_conf, Arc::clone(&cli_stop)));
                let cli_sock = Rc::new(cli_sock);
                let mut endpoint =
                    Endpoint::new(Box::new(cli_conf), false, cli_hdl, cli_sock.clone());

                endpoint
                    .connect(cli_addr, srv_addr, host, session, token)
                    .unwrap();
                endpoint.process_connections().unwrap();
                TestPair::event_loop(&mut endpoint, &mut cli_poll, &cli_sock, &cli_stop).unwrap();
            });

            // Create and run server endpoint in child thread
            let server = thread::spawn(move || {
                let srv_hdl = Box::new(ServerHandler::new(srv_case_conf, Arc::clone(&srv_stop)));
                let srv_sock = Rc::new(srv_sock);

                let mut endpoint =
                    Endpoint::new(Box::new(srv_conf), true, srv_hdl, srv_sock.clone());
                TestPair::event_loop(&mut endpoint, &mut srv_poll, &srv_sock, &srv_stop).unwrap();
            });

            client.join().unwrap();
            server.join().unwrap();
            Ok(())
        }

        /// Run client/server endpoint with default config.
        fn run_with_test_config(&mut self, hdr_opt: CaseConf) -> Result<()> {
            let cli_conf = TestPair::new_test_config(false)?;
            let srv_conf = TestPair::new_test_config(true)?;

            self.run(cli_conf, srv_conf, hdr_opt)
        }

        /// Run event loop for the endpoint.
        fn event_loop(
            e: &mut Endpoint,
            poll: &mut mio::Poll,
            sock: &TestSocket,
            stop: &Arc<AtomicBool>,
        ) -> Result<()> {
            let mut events = mio::Events::with_capacity(1024);

            while !stop.load(Ordering::Relaxed) {
                // Wait for timeout/IO events.
                let timeout = match e.timeout() {
                    Some(v) => cmp::max(Some(v), Some(crate::TIMER_GRANULARITY)),
                    // Always allow the test thread to wake up and check for stop status.
                    None => {
                        trace!("{} event loop: no expirable connection.", e.trace_id());
                        Some(crate::TIMER_GRANULARITY * 100)
                    }
                };

                trace!(
                    "{} event loop: poll events (timeout: {:?})...",
                    e.trace_id(),
                    timeout.unwrap(),
                );
                poll.poll(&mut events, timeout)?;

                // Process timeout events
                if events.is_empty() {
                    trace!("{} event loop: process timeout events", e.trace_id());
                    e.on_timeout(Instant::now());
                    if let Err(err) = e.process_connections() {
                        trace!(
                            "{} event loop: process_connections(): {:?}",
                            e.trace_id(),
                            err
                        );
                    }
                    continue;
                }

                // Process IO events
                for event in events.iter() {
                    trace!("{} event loop: process io events", e.trace_id());
                    if event.is_readable() {
                        TestPair::process_read_event(e, sock)?;
                    }
                }
                if let Err(err) = e.process_connections() {
                    trace!(
                        "{} event loop: process_connections(): {:?}",
                        e.trace_id(),
                        err
                    );
                }
            }

            trace!("{} event loop exit", e.trace_id());
            Ok(())
        }

        // Process read event on the socket.
        fn process_read_event(e: &mut Endpoint, s: &TestSocket) -> Result<()> {
            let mut recv_buf = vec![0; 65535];
            loop {
                // Read datagram from the socket.
                let (len, remote) = match s.socket.recv_from(&mut recv_buf) {
                    Ok(v) => v,
                    Err(err) => {
                        if err.kind() == std::io::ErrorKind::WouldBlock {
                            trace!("{} socket recv would block, stop recving", e.trace_id());
                            break;
                        }
                        return Err(format!("socket recv error: {:?}", err).into());
                    }
                };

                // Process the incoming packet.
                let pkt_buf = &mut recv_buf[..len];
                let pkt_info = PacketInfo {
                    src: remote,
                    dst: s.socket.local_addr().unwrap(),
                    time: Instant::now(),
                };
                match e.recv(pkt_buf, &pkt_info) {
                    Ok(_) => {}
                    Err(err) => {
                        error!("{} recv failed: {:?}, drop the packet", e.trace_id, err);
                        continue;
                    }
                };
            }
            Ok(())
        }

        /// Create a resume address token.
        fn new_test_address_token(client_ip: IpAddr, key: &[u8]) -> Vec<u8> {
            let client_addr = SocketAddr::new(client_ip, 0);
            let token = AddressToken::new_resume_token(client_addr);

            let token_key = LessSafeKey::new(UnboundKey::new(&aead::AES_128_GCM, &key).unwrap());
            token.encode(&token_key).unwrap()
        }

        /// Create test config for endpoint
        fn new_test_config(is_server: bool) -> Result<Config> {
            let mut conf = Config::new()?;
            conf.set_initial_max_data(1024 * 1024 * 10);
            conf.set_initial_max_stream_data_bidi_local(1024 * 1024 * 1);
            conf.set_initial_max_stream_data_bidi_remote(1024 * 1024 * 1);
            conf.set_initial_max_stream_data_uni(1024 * 1024 * 1);
            conf.set_initial_max_streams_bidi(100);
            conf.set_initial_max_streams_uni(100);
            conf.set_max_idle_timeout(10000);
            conf.set_max_handshake_timeout(5000);
            conf.set_recv_udp_payload_size(1500);
            conf.set_max_connection_window(1024 * 1024 * 10);
            conf.set_max_stream_window(1024 * 1024 * 1);
            conf.set_max_concurrent_conns(100);
            conf.set_active_connection_id_limit(4);
            conf.set_ack_delay_exponent(3);
            conf.set_max_ack_delay(25);
            conf.set_reset_token_key([1u8; 64]);
            conf.set_send_batch_size(16);
            conf.set_initial_rtt(33); // for reducing test case execution time
            conf.set_max_handshake_timeout(0);

            let application_protos = vec![b"h3".to_vec()];
            let tls_config = if !is_server {
                TlsConfig::new_client_config(application_protos, true)?
            } else {
                let mut tls_config = TlsConfig::new_server_config(
                    "src/tls/testdata/cert.crt",
                    "src/tls/testdata/cert.key",
                    application_protos,
                    true,
                )?;
                tls_config.set_ticket_key(&vec![0x73; 48])?;
                tls_config
            };
            conf.set_tls_config(tls_config);

            Ok(conf)
        }

        /// Create test session data using default ticket key
        pub fn new_test_session_state() -> Vec<u8> {
            let mut client_config = TestPair::new_test_config(false).unwrap();
            let mut server_config = TestPair::new_test_config(true).unwrap();

            let mut test_pair =
                connection::tests::TestPair::new(&mut client_config, &mut server_config).unwrap();
            test_pair.handshake().unwrap();
            let session = test_pair.client.session().unwrap();

            trace!(
                "Extract session state for resumed handshake: {} bytes",
                session.len()
            );
            session.to_vec()
        }
    }

    /// UdpSocket with fault injection.
    struct TestSocket {
        socket: mio::net::UdpSocket,

        /// Rng for testing purposes.
        /// TODO: use custom deterministic rng
        rng: RefCell<StepRng>,

        /// Used for simulating packet loss (0~100)
        packet_loss: u32,

        /// Used for simulating one way delay (ms)
        packet_delay: u32,

        /// Used for simulating out-of-order packet (0~100)
        packet_reorder: u32,

        /// Used for simulating duplicated packet (0~100)
        packet_duplication: u32,

        /// Used for simulating corrupted packet (0~100)
        packet_corruption: u32,

        /// Unique trace id
        trace_id: String,
    }

    impl TestSocket {
        /// Create a UdpSocket using random unused port.
        fn new(reg: &mio::Registry, conf: &CaseConf, trace_id: String) -> Result<Self> {
            let addr: SocketAddr = "127.8.8.8:0".parse().unwrap();
            let mut socket = mio::net::UdpSocket::bind(addr)?;

            const TOKEN: mio::Token = mio::Token(0);
            reg.register(&mut socket, TOKEN, mio::Interest::READABLE)
                .unwrap();

            let rng = RefCell::new(StepRng::new(0, 1));

            Ok(Self {
                socket,
                rng,
                packet_loss: conf.packet_loss,
                packet_delay: conf.packet_delay,
                packet_reorder: conf.packet_reorder,
                packet_duplication: conf.packet_duplication,
                packet_corruption: conf.packet_corruption,
                trace_id,
            })
        }

        /// Return the local socket address.
        fn local_addr(&self) -> std::io::Result<SocketAddr> {
            self.socket.local_addr()
        }

        /// Whether an abnormal event should be injected.
        fn sampling(&self, rate: u32) -> bool {
            self.rng.borrow_mut().next_u32() % 100 < rate
        }

        /// Filter packets which are delayed long enough.
        fn try_delay_packets(
            &self,
            mut pkts: Vec<(Vec<u8>, PacketInfo)>,
        ) -> Vec<(Vec<u8>, PacketInfo)> {
            if self.packet_delay == 0 {
                return pkts;
            }

            let now = Instant::now();
            let delay = Duration::from_millis(self.packet_delay as u64);
            let count = pkts.len();
            for i in 0..count {
                let (_, info) = pkts[i];
                match now.checked_duration_since(info.time) {
                    None => {
                        pkts.truncate(i);
                        break;
                    }
                    Some(v) if v < delay => {
                        pkts.truncate(i);
                        break;
                    }
                    _ => (),
                }
            }

            trace!(
                "{} {} out of {} packets is DELAYED !",
                &self.trace_id,
                count - pkts.len(),
                count
            );
            pkts
        }

        /// Randomly reorder packets.
        fn try_reorder_packets(&self, pkts: &mut [(Vec<u8>, PacketInfo)], window: usize) {
            if self.packet_reorder == 0 || !self.sampling(self.packet_reorder) {
                return;
            }

            let mut start = 0;
            while start < pkts.len() {
                let end = cmp::min(start + window, pkts.len());
                let range = &mut pkts[start..end];
                range.shuffle(&mut *self.rng.borrow_mut());
                start = end;
            }
            trace!(
                "{} {} packets REORDERD ! reorder threshold: {}",
                &self.trace_id,
                pkts.len(),
                window,
            );
        }

        // Whether the packet should be dropped.
        fn try_drop_packet(&self, pkt: &[u8], info: &PacketInfo) -> bool {
            if self.packet_loss == 0 || !self.sampling(self.packet_loss) {
                return false;
            }

            trace!(
                "{} write {} bytes {:?} but it is LOST !",
                &self.trace_id,
                pkt.len(),
                info,
            );
            true
        }

        /// Randomly change one byte of the packet data.
        fn try_mangle_packet(&self, pkt: &mut [u8], info: &PacketInfo) {
            if self.packet_corruption == 0 || !self.sampling(self.packet_corruption) {
                return;
            }

            let idx = (rand::random::<u32>() as usize) % pkt.len();
            let val = rand::random::<u8>();
            pkt[idx as usize] = val;
            trace!(
                "{} write {} bytes {:?} but it is CORRUPTED !",
                &self.trace_id,
                pkt.len(),
                info,
            );
        }

        /// Randomly dupulicate the packet.
        fn try_dup_packet(&self, pkt: &[u8], info: &PacketInfo) {
            if self.packet_duplication == 0 || !self.sampling(self.packet_duplication) {
                return;
            }

            let _ = self.socket.send_to(pkt, info.dst);
            trace!(
                "{} write {} bytes {:?} but it is DUPLICATED !",
                &self.trace_id,
                pkt.len(),
                info,
            );
        }
    }

    impl PacketSendHandler for TestSocket {
        fn on_packets_send(&self, pkts: &[(Vec<u8>, PacketInfo)]) -> crate::Result<usize> {
            let mut count = 0;

            // Simulate event of packet delay
            let mut pkts = pkts.to_vec();
            pkts = self.try_delay_packets(pkts);
            if pkts.is_empty() {
                return Ok(0);
            }

            // Simulate event of packet reorder
            self.try_reorder_packets(&mut pkts, 4);

            for (mut pkt, info) in pkts {
                // Simulate event of packet loss
                if self.try_drop_packet(&pkt, &info) {
                    count += 1;
                    continue;
                }

                // Simulate event of packet corruption
                self.try_mangle_packet(&mut pkt, &info);

                // Send the packet out.
                if let Err(e) = self.socket.send_to(&pkt, info.dst) {
                    if e.kind() == std::io::ErrorKind::WouldBlock {
                        return Ok(count);
                    }
                    return Err(crate::Error::InvalidOperation(
                        format!("send_to(): {:?}", e).into(),
                    ));
                }
                trace!("{} write {} bytes {:?}", &self.trace_id, pkt.len(), info);
                count += 1;

                // Simulate event of packet duplication
                self.try_dup_packet(&pkt, &info);
            }

            Ok(count)
        }
    }

    // A mocked socket which implements PacketSendHandler.
    struct MockSocket {
        packets: RefCell<Vec<(Vec<u8>, PacketInfo)>>,
    }

    impl MockSocket {
        fn new() -> Self {
            MockSocket {
                packets: RefCell::new(Vec::new()),
            }
        }
    }

    impl PacketSendHandler for MockSocket {
        fn on_packets_send(&self, pkts: &[(Vec<u8>, PacketInfo)]) -> crate::Result<usize> {
            let mut packets = self.packets.borrow_mut();
            packets.extend_from_slice(pkts);
            Ok(pkts.len())
        }
    }

    #[derive(Clone, Default)]
    struct CaseConf {
        handshake_only: bool,
        client_0rtt_expected: bool,
        resumption_expected: bool,
        new_token_expected: bool,
        session: Option<Vec<u8>>,
        token: Option<Vec<u8>>,
        request_num: u32,
        request_size: usize,
        packet_loss: u32,
        packet_delay: u32,
        packet_reorder: u32,
        packet_duplication: u32,
        packet_corruption: u32,
    }

    #[derive(Default)]
    struct ClientHandler {
        data: Vec<u8>,
        reqs: Vec<bytes::Bytes>,
        rsps: Vec<bytes::BytesMut>,
        count: u16,
        conf: CaseConf,
        token: Option<Vec<u8>>,
        stop: Arc<AtomicBool>,
    }

    impl ClientHandler {
        fn new(conf: CaseConf, stop: Arc<AtomicBool>) -> Self {
            let len = conf.request_size * (conf.request_num as usize);
            let mut handler = Self {
                data: vec![0; len],
                reqs: Vec::new(),
                rsps: Vec::new(),
                count: 0,
                conf,
                token: None,
                stop,
            };
            rand::thread_rng().fill_bytes(&mut handler.data);

            for i in 0..handler.conf.request_num {
                let start = (i as usize) * handler.conf.request_size;
                let buf = &handler.data[start..start + handler.conf.request_size];
                handler.reqs.push(bytes::Bytes::copy_from_slice(buf));
                handler.rsps.push(bytes::BytesMut::new());
            }
            handler
        }

        fn write_request(&mut self, conn: &mut Connection, req_id: usize) {
            let stream_id = (req_id * 4) as u64;
            let cap = conn.stream_capacity(stream_id).unwrap();
            let len = cmp::min(self.reqs[req_id].len(), cap);

            let buf = self.reqs[req_id].split_to(len);
            let fin = self.reqs[req_id].is_empty();
            conn.stream_write(stream_id, buf, fin).unwrap();
        }
    }

    impl TransportHandler for ClientHandler {
        fn on_conn_created(&mut self, conn: &mut Connection) {
            assert_eq!(conn.is_in_early_data(), self.conf.client_0rtt_expected);
            trace!("{} connection created", conn.trace_id());

            // Try to send requests to the server (0RTT)
            if self.conf.client_0rtt_expected {
                for i in 0..self.reqs.len() {
                    let stream_id = (i * 4) as u64;
                    conn.stream_set_priority(stream_id, 1, false).unwrap();
                    self.write_request(conn, i);
                }
            }
        }

        fn on_conn_established(&mut self, conn: &mut Connection) {
            trace!("{} connection established", conn.trace_id());
            assert_eq!(conn.is_resumed(), self.conf.resumption_expected);

            // Try to send requests to the server (1RTT)
            for i in 0..self.reqs.len() {
                let stream_id = (i * 4) as u64;
                conn.stream_set_priority(stream_id, 1, false).unwrap();
                self.write_request(conn, i);
            }
        }

        fn on_conn_closed(&mut self, conn: &mut Connection) {
            trace!("{} connection closed", conn.trace_id());
            assert_eq!(conn.is_established(), true);
            if self.conf.new_token_expected {
                assert!(self.token.is_some());
            }
            self.stop.store(true, Ordering::Relaxed);
        }

        fn on_stream_created(&mut self, conn: &mut Connection, stream_id: u64) {
            trace!("{} stream {} created", conn.trace_id(), stream_id);
        }

        fn on_stream_readable(&mut self, conn: &mut Connection, stream_id: u64) {
            let idx = (stream_id / 4) as usize;
            let mut buf = vec![0; 2048];

            // Read response from the server
            let (len, fin) = conn.stream_read(stream_id, &mut buf).unwrap();
            self.rsps[idx].extend_from_slice(&mut buf[..len]);

            // Validate the response
            if fin {
                self.count += 1;
                let len = self.rsps[idx].len();
                let buf = self.rsps[idx].split_to(len);
                let buf = buf.freeze();

                let start = idx * (self.conf.request_size) as usize;
                let raw = &self.data[start..start + self.conf.request_size];
                assert_eq!(raw, &buf);
            }

            // Close connection when all requests has been processed
            if self.count as usize == self.reqs.len() {
                conn.close(true, 0, &[]).unwrap();
                trace!("client close connection");
            }
        }

        fn on_stream_writable(&mut self, conn: &mut Connection, stream_id: u64) {
            let i = (stream_id / 4) as usize;
            self.write_request(conn, i);
        }

        fn on_stream_closed(&mut self, conn: &mut Connection, stream_id: u64) {
            trace!("{} stream {} closed", conn.trace_id(), stream_id);
        }

        fn on_new_token(&mut self, conn: &mut Connection, token: Vec<u8>) {
            self.token = Some(token);
        }
    }

    struct ServerStreamContext {
        buf: bytes::BytesMut,
        fin: bool,
    }

    struct ServerHandler {
        handshake_only: bool,
        stop: Arc<AtomicBool>,
    }

    impl ServerHandler {
        fn new(conf: CaseConf, stop: Arc<AtomicBool>) -> Self {
            Self {
                handshake_only: conf.handshake_only,
                stop,
            }
        }
    }

    impl TransportHandler for ServerHandler {
        fn on_conn_created(&mut self, conn: &mut Connection) {
            trace!("{} connection created", conn.trace_id());
        }

        fn on_conn_established(&mut self, conn: &mut Connection) {
            trace!("{} connection established", conn.trace_id());
            if self.handshake_only {
                conn.close(true, 0, &[]).unwrap();
            }
        }

        fn on_conn_closed(&mut self, conn: &mut Connection) {
            trace!("{} connection closed", conn.trace_id());
        }

        fn on_stream_created(&mut self, conn: &mut Connection, stream_id: u64) {
            let sctx = ServerStreamContext {
                buf: bytes::BytesMut::new(),
                fin: false,
            };
            conn.stream_set_context(stream_id, sctx).unwrap();
        }

        // Read stream data and add to buffer.
        fn on_stream_readable(&mut self, conn: &mut Connection, stream_id: u64) {
            let mut buf = vec![0; 2048];
            let (len, fin) = conn.stream_read(stream_id, &mut buf).unwrap();

            let sctx = conn.stream_context(stream_id).unwrap();
            let sctx = sctx.downcast_mut::<ServerStreamContext>().unwrap();
            sctx.buf.extend_from_slice(&mut buf[..len]);
            sctx.fin = fin;
        }

        // Echo data received from the client.
        fn on_stream_writable(&mut self, conn: &mut Connection, stream_id: u64) {
            let cap = conn.stream_capacity(stream_id).unwrap();

            // Prepare send buffer
            let sctx = conn.stream_context(stream_id).unwrap();
            let sctx = sctx.downcast_mut::<ServerStreamContext>().unwrap();
            if sctx.buf.is_empty() {
                return;
            }
            let len = cmp::min(cap, sctx.buf.len());
            let buf = sctx.buf.split_to(len);
            let buf = buf.freeze();

            // Send data to the client
            let fin = sctx.fin && sctx.buf.is_empty();
            conn.stream_write(stream_id, buf, fin).unwrap();
        }

        fn on_stream_closed(&mut self, conn: &mut Connection, stream_id: u64) {
            trace!("{} stream {} closed", conn.trace_id(), stream_id);
        }

        fn on_new_token(&mut self, conn: &mut Connection, token: Vec<u8>) {}
    }

    // Test Initial packet
    const TEST_INITIAL: [u8; 700] = [
        0xc0, 0x00, 0x00, 0x00, 0x01, 0x08, 0x83, 0x94, 0xc8, 0xf0, 0x3e, 0x51, 0x57, 0x08, 0x00,
        0x00, 0x44, 0x9e, 0x7b, 0x9a, 0xec, 0x34, 0xd1, 0xb1, 0xc9, 0x8d, 0xd7, 0x68, 0x9f, 0xb8,
        0xec, 0x11, 0xd2, 0x42, 0xb1, 0x23, 0xdc, 0x9b, 0xd8, 0xba, 0xb9, 0x36, 0xb4, 0x7d, 0x92,
        0xec, 0x35, 0x6c, 0x0b, 0xab, 0x7d, 0xf5, 0x97, 0x6d, 0x27, 0xcd, 0x44, 0x9f, 0x63, 0x30,
        0x00, 0x99, 0xf3, 0x99, 0x1c, 0x26, 0x0e, 0xc4, 0xc6, 0x0d, 0x17, 0xb3, 0x1f, 0x84, 0x29,
        0x15, 0x7b, 0xb3, 0x5a, 0x12, 0x82, 0xa6, 0x43, 0xa8, 0xd2, 0x26, 0x2c, 0xad, 0x67, 0x50,
        0x0c, 0xad, 0xb8, 0xe7, 0x37, 0x8c, 0x8e, 0xb7, 0x53, 0x9e, 0xc4, 0xd4, 0x90, 0x5f, 0xed,
        0x1b, 0xee, 0x1f, 0xc8, 0xaa, 0xfb, 0xa1, 0x7c, 0x75, 0x0e, 0x2c, 0x7a, 0xce, 0x01, 0xe6,
        0x00, 0x5f, 0x80, 0xfc, 0xb7, 0xdf, 0x62, 0x12, 0x30, 0xc8, 0x37, 0x11, 0xb3, 0x93, 0x43,
        0xfa, 0x02, 0x8c, 0xea, 0x7f, 0x7f, 0xb5, 0xff, 0x89, 0xea, 0xc2, 0x30, 0x82, 0x49, 0xa0,
        0x22, 0x52, 0x15, 0x5e, 0x23, 0x47, 0xb6, 0x3d, 0x58, 0xc5, 0x45, 0x7a, 0xfd, 0x84, 0xd0,
        0x5d, 0xff, 0xfd, 0xb2, 0x03, 0x92, 0x84, 0x4a, 0xe8, 0x12, 0x15, 0x46, 0x82, 0xe9, 0xcf,
        0x01, 0x2f, 0x90, 0x21, 0xa6, 0xf0, 0xbe, 0x17, 0xdd, 0xd0, 0xc2, 0x08, 0x4d, 0xce, 0x25,
        0xff, 0x9b, 0x06, 0xcd, 0xe5, 0x35, 0xd0, 0xf9, 0x20, 0xa2, 0xdb, 0x1b, 0xf3, 0x62, 0xc2,
        0x3e, 0x59, 0x6d, 0x11, 0xa4, 0xf5, 0xa6, 0xcf, 0x39, 0x48, 0x83, 0x8a, 0x3a, 0xec, 0x4e,
        0x15, 0xda, 0xf8, 0x50, 0x0a, 0x6e, 0xf6, 0x9e, 0xc4, 0xe3, 0xfe, 0xb6, 0xb1, 0xd9, 0x8e,
        0x61, 0x0a, 0xc8, 0xb7, 0xec, 0x3f, 0xaf, 0x6a, 0xd7, 0x60, 0xb7, 0xba, 0xd1, 0xdb, 0x4b,
        0xa3, 0x48, 0x5e, 0x8a, 0x94, 0xdc, 0x25, 0x0a, 0xe3, 0xfd, 0xb4, 0x1e, 0xd1, 0x5f, 0xb6,
        0xa8, 0xe5, 0xeb, 0xa0, 0xfc, 0x3d, 0xd6, 0x0b, 0xc8, 0xe3, 0x0c, 0x5c, 0x42, 0x87, 0xe5,
        0x38, 0x05, 0xdb, 0x05, 0x9a, 0xe0, 0x64, 0x8d, 0xb2, 0xf6, 0x42, 0x64, 0xed, 0x5e, 0x39,
        0xbe, 0x2e, 0x20, 0xd8, 0x2d, 0xf5, 0x66, 0xda, 0x8d, 0xd5, 0x99, 0x8c, 0xca, 0xbd, 0xae,
        0x05, 0x30, 0x60, 0xae, 0x6c, 0x7b, 0x43, 0x78, 0xe8, 0x46, 0xd2, 0x9f, 0x37, 0xed, 0x7b,
        0x4e, 0xa9, 0xec, 0x5d, 0x82, 0xe7, 0x96, 0x1b, 0x7f, 0x25, 0xa9, 0x32, 0x38, 0x51, 0xf6,
        0x81, 0xd5, 0x82, 0x36, 0x3a, 0xa5, 0xf8, 0x99, 0x37, 0xf5, 0xa6, 0x72, 0x58, 0xbf, 0x63,
        0xad, 0x6f, 0x1a, 0x0b, 0x1d, 0x96, 0xdb, 0xd4, 0xfa, 0xdd, 0xfc, 0xef, 0xc5, 0x26, 0x6b,
        0xa6, 0x61, 0x17, 0x22, 0x39, 0x5c, 0x90, 0x65, 0x56, 0xbe, 0x52, 0xaf, 0xe3, 0xf5, 0x65,
        0x63, 0x6a, 0xd1, 0xb1, 0x7d, 0x50, 0x8b, 0x73, 0xd8, 0x74, 0x3e, 0xeb, 0x52, 0x4b, 0xe2,
        0x2b, 0x3d, 0xcb, 0xc2, 0xc7, 0x46, 0x8d, 0x54, 0x11, 0x9c, 0x74, 0x68, 0x44, 0x9a, 0x13,
        0xd8, 0xe3, 0xb9, 0x58, 0x11, 0xa1, 0x98, 0xf3, 0x49, 0x1d, 0xe3, 0xe7, 0xfe, 0x94, 0x2b,
        0x33, 0x04, 0x07, 0xab, 0xf8, 0x2a, 0x4e, 0xd7, 0xc1, 0xb3, 0x11, 0x66, 0x3a, 0xc6, 0x98,
        0x90, 0xf4, 0x15, 0x70, 0x15, 0x85, 0x3d, 0x91, 0xe9, 0x23, 0x03, 0x7c, 0x22, 0x7a, 0x33,
        0xcd, 0xd5, 0xec, 0x28, 0x1c, 0xa3, 0xf7, 0x9c, 0x44, 0x54, 0x6b, 0x9d, 0x90, 0xca, 0x00,
        0xf0, 0x64, 0xc9, 0x9e, 0x3d, 0xd9, 0x79, 0x11, 0xd3, 0x9f, 0xe9, 0xc5, 0xd0, 0xb2, 0x3a,
        0x22, 0x9a, 0x23, 0x4c, 0xb3, 0x61, 0x86, 0xc4, 0x81, 0x9e, 0x8b, 0x9c, 0x59, 0x27, 0x72,
        0x66, 0x32, 0x29, 0x1d, 0x6a, 0x41, 0x82, 0x11, 0xcc, 0x29, 0x62, 0xe2, 0x0f, 0xe4, 0x7f,
        0xeb, 0x3e, 0xdf, 0x33, 0x0f, 0x2c, 0x60, 0x3a, 0x9d, 0x48, 0xc0, 0xfc, 0xb5, 0x69, 0x9d,
        0xbf, 0xe5, 0x89, 0x64, 0x25, 0xc5, 0xba, 0xc4, 0xae, 0xe8, 0x2e, 0x57, 0xa8, 0x5a, 0xaf,
        0x4e, 0x25, 0x13, 0xe4, 0xf0, 0x57, 0x96, 0xb0, 0x7b, 0xa2, 0xee, 0x47, 0xd8, 0x05, 0x06,
        0xf8, 0xd2, 0xc2, 0x5e, 0x50, 0xfd, 0x14, 0xde, 0x71, 0xe6, 0xc4, 0x18, 0x55, 0x93, 0x02,
        0xf9, 0x39, 0xb0, 0xe1, 0xab, 0xd5, 0x76, 0xf2, 0x79, 0xc4, 0xb2, 0xe0, 0xfe, 0xb8, 0x5c,
        0x1f, 0x28, 0xff, 0x18, 0xf5, 0x88, 0x91, 0xff, 0xef, 0x13, 0x2e, 0xef, 0x2f, 0xa0, 0x93,
        0x46, 0xae, 0xe3, 0x3c, 0x28, 0xeb, 0x13, 0x0f, 0xf2, 0x8f, 0x5b, 0x76, 0x69, 0x53, 0x33,
        0x41, 0x13, 0x21, 0x19, 0x96, 0xd2, 0x00, 0x11, 0xa1, 0x98, 0xe3, 0xfc, 0x43, 0x3f, 0x9f,
        0x25, 0x41, 0x01, 0x0a, 0xe1, 0x7c, 0x1b, 0xf2, 0x02, 0x58, 0x0f, 0x60, 0x47, 0x47, 0x2f,
        0xb3, 0x68, 0x57, 0xfe, 0x84, 0x3b, 0x19, 0xf5, 0x98, 0x40, 0x09, 0xdd, 0xc3, 0x24, 0x04,
        0x4e, 0x84, 0x7a, 0x4f, 0x4a, 0x0a, 0xb3, 0x4f, 0x71, 0x95, 0x95, 0xde, 0x37, 0x25, 0x2d,
        0x62, 0x35, 0x36, 0x5e, 0x9b, 0x84, 0x39, 0x2b, 0x06, 0x10,
    ];

    // Test stateless reset packet.
    const TEST_STATELESS_RESET: [u8; 42] = [
        0x40, 0xc1, 0xe7, 0x9a, 0xb0, 0x94, 0xc7, 0xa2, 0x49, 0x43, 0x84, 0xbb, 0x23, 0x05, 0x6f,
        0x20, 0x11, 0x0f, 0x9b, 0x13, 0x01, 0x36, 0xf3, 0xa0, 0x45, 0xb8, 0x9f, 0x09, 0x36, 0x79,
        0x0f, 0xc2, 0x7f, 0x14, 0xed, 0x3d, 0x67, 0xbf, 0x79, 0x21, 0xad, 0xc7,
    ];

    #[test]
    fn handshake_full_with_retry_disabled() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_full_with_retry_enabled() -> Result<()> {
        let mut t = TestPair::new();

        let cli_conf = TestPair::new_test_config(false)?;
        let mut srv_conf = TestPair::new_test_config(true)?;
        srv_conf.set_address_token_key(vec![[1; 16]])?;

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_resume_with_retry_disabled() -> Result<()> {
        let mut t = TestPair::new();

        let cli_conf = TestPair::new_test_config(false)?;
        let srv_conf = TestPair::new_test_config(true)?;

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;
        case_conf.session = Some(TestPair::new_test_session_state());
        case_conf.client_0rtt_expected = true;
        case_conf.resumption_expected = true;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_resume_with_retry_enabled() -> Result<()> {
        let mut t = TestPair::new();

        let token_key = [1; 16];
        let client_ip = IpAddr::V4(Ipv4Addr::new(127, 8, 8, 8));
        let token = TestPair::new_test_address_token(client_ip, &token_key);

        let cli_conf = TestPair::new_test_config(false)?;
        let mut srv_conf = TestPair::new_test_config(true)?;
        srv_conf.set_address_token_key(vec![token_key])?;

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;
        case_conf.session = Some(TestPair::new_test_session_state());
        case_conf.token = Some(token);
        case_conf.client_0rtt_expected = true;
        case_conf.resumption_expected = true;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_resume_with_invalid_session() -> Result<()> {
        let mut t = TestPair::new();

        let token_key = [1; 16];
        let client_ip = IpAddr::V4(Ipv4Addr::new(127, 7, 7, 7));
        let token = TestPair::new_test_address_token(client_ip, &token_key);

        let cli_conf = TestPair::new_test_config(false)?;
        let mut srv_conf = TestPair::new_test_config(true)?;
        srv_conf.set_address_token_key(vec![token_key])?;
        let mut tls_config = TlsConfig::new_server_config(
            "src/tls/testdata/cert.crt",
            "src/tls/testdata/cert.key",
            vec![b"h3".to_vec()],
            true,
        )?;
        tls_config.set_ticket_key(&vec![0x01; 48])?;
        srv_conf.set_tls_config(tls_config);

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;
        case_conf.session = Some(TestPair::new_test_session_state());
        case_conf.token = Some(token);
        case_conf.client_0rtt_expected = true;
        case_conf.resumption_expected = false;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_resume_with_invalid_token() -> Result<()> {
        let mut t = TestPair::new();

        let token_key = [1; 16];
        let client_ip = IpAddr::V4(Ipv4Addr::new(127, 8, 8, 8));
        let token = TestPair::new_test_address_token(client_ip, &token_key);

        let cli_conf = TestPair::new_test_config(false)?;
        let mut srv_conf = TestPair::new_test_config(true)?;
        srv_conf.set_address_token_key(vec![token_key])?;

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;
        case_conf.session = Some(TestPair::new_test_session_state());
        case_conf.token = Some(token);
        case_conf.client_0rtt_expected = true;
        case_conf.resumption_expected = true;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_with_client_zero_cid() -> Result<()> {
        let mut t = TestPair::new();

        let mut cli_conf = TestPair::new_test_config(false)?;
        cli_conf.set_cid_len(0);
        let srv_conf = TestPair::new_test_config(true)?;

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_with_server_zero_cid() -> Result<()> {
        let mut t = TestPair::new();

        let cli_conf = TestPair::new_test_config(false)?;
        let mut srv_conf = TestPair::new_test_config(true)?;
        srv_conf.set_cid_len(0);

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_with_both_zero_cid() -> Result<()> {
        let mut t = TestPair::new();

        let mut cli_conf = TestPair::new_test_config(false)?;
        cli_conf.set_cid_len(0);
        let mut srv_conf = TestPair::new_test_config(true)?;
        srv_conf.set_cid_len(0);

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_with_packet_loss() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;
        case_conf.packet_loss = 1;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_with_packet_delay() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;
        case_conf.packet_delay = 20;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_with_packet_reorder() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;
        case_conf.packet_reorder = 10;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_with_packet_corruption() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;
        case_conf.packet_corruption = 1;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn handshake_with_packet_dupulication() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.handshake_only = true;
        case_conf.packet_duplication = 2;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn endpoint_connect() -> Result<()> {
        let cli_addr: SocketAddr = "127.8.8.8:8888".parse().unwrap();
        let srv_addr: SocketAddr = "127.8.8.8:8443".parse().unwrap();
        let host = Some("example.org");

        // connect on client endpoint
        let mut e = Endpoint::new(
            Box::new(TestPair::new_test_config(false)?),
            false,
            Box::new(ClientHandler::new(
                CaseConf::default(),
                Arc::new(AtomicBool::new(false)),
            )),
            Rc::new(MockSocket::new()),
        );
        assert!(e.connect(cli_addr, srv_addr, host, None, None).is_ok());
        assert_eq!(e.conns.len(), 1);

        // connect on closed client endpoint
        e.close();
        assert!(e.connect(cli_addr, srv_addr, host, None, None).is_err());

        // connect on server endpoint
        let mut e = Endpoint::new(
            Box::new(TestPair::new_test_config(true)?),
            true,
            Box::new(ServerHandler::new(
                CaseConf::default(),
                Arc::new(AtomicBool::new(false)),
            )),
            Rc::new(MockSocket::new()),
        );
        assert!(e.connect(cli_addr, srv_addr, host, None, None).is_err());
        assert!(e.conns.is_empty());

        Ok(())
    }

    #[test]
    fn endpoint_new_token() -> Result<()> {
        let mut t = TestPair::new();

        let cli_conf = TestPair::new_test_config(false)?;
        let mut srv_conf = TestPair::new_test_config(true)?;
        srv_conf.set_address_token_key(vec![[1; 16]])?;

        let mut case_conf = CaseConf::default();
        case_conf.request_num = 1;
        case_conf.request_size = 100;
        case_conf.new_token_expected = true;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn endpoint_basic_operations() -> Result<()> {
        let mut e = Endpoint::new(
            Box::new(TestPair::new_test_config(false)?),
            false,
            Box::new(ClientHandler::new(
                CaseConf::default(),
                Arc::new(AtomicBool::new(false)),
            )),
            Rc::new(MockSocket::new()),
        );
        let id = "ClientEndpoint";
        e.set_trace_id(String::from(id));
        assert_eq!(e.trace_id(), id);
        assert_eq!(e.conn_exist(ConnectionId::random()), false);

        Ok(())
    }

    #[test]
    fn endpoint_version_negtiation() -> Result<()> {
        let mut initial_unknown_ver = TEST_INITIAL.clone();
        initial_unknown_ver[1] = 0x73;

        let sock = Rc::new(MockSocket::new());
        let mut e = Endpoint::new(
            Box::new(TestPair::new_test_config(true)?),
            true,
            Box::new(ServerHandler::new(
                CaseConf::default(),
                Arc::new(AtomicBool::new(false)),
            )),
            sock.clone(),
        );
        let info = TestTool::new_test_packet_info(false);

        // Server recv an Initial with unknown version
        e.recv(&mut initial_unknown_ver, &info)?;
        e.process_connections()?;

        // Server send Version Negoiation
        let packets = sock.packets.borrow();
        assert!(packets.len() > 0);

        let (packet, _) = &packets[0];
        let (hdr, _) = PacketHeader::from_bytes(&packet, 8)?;
        assert_eq!(hdr.pkt_type, PacketType::VersionNegotiation);
        Ok(())
    }

    fn endpoint_process_stateless_reset(
        sock: Rc<MockSocket>,
        packet_unknown: &mut [u8],
        is_server: bool,
        enable: bool,
    ) -> Result<()> {
        let mut conf = TestPair::new_test_config(is_server)?;
        conf.enable_stateless_reset(enable);
        let mut e = Endpoint::new(
            Box::new(conf),
            is_server,
            Box::new(ServerHandler::new(
                CaseConf::default(),
                Arc::new(AtomicBool::new(false)),
            )),
            sock.clone(),
        );
        let info = TestTool::new_test_packet_info(is_server);

        // Endpoint recv an unknown packet.
        e.recv(packet_unknown, &info)?;
        e.process_connections()?;
        Ok(())
    }

    #[test]
    fn endpoint_stateless_reset_by_client() -> Result<()> {
        let mut packet_unknown = TEST_INITIAL.clone();
        let sock = Rc::new(MockSocket::new());

        // Client recv an Initial with unknown dcid
        endpoint_process_stateless_reset(sock.clone(), &mut packet_unknown, false, true)?;

        // Client send stateless reset
        let packets = sock.packets.borrow();
        assert!(packets.len() > 0);
        let (packet, _) = &packets[0];
        let (hdr, _) = PacketHeader::from_bytes(&packet, 8)?;
        assert_eq!(hdr.pkt_type, PacketType::OneRTT);

        Ok(())
    }

    #[test]
    fn endpoint_stateless_reset_by_server() -> Result<()> {
        let mut packet_stateless = TEST_STATELESS_RESET.clone();
        let sock = Rc::new(MockSocket::new());

        // Server recv a stateless packet with unknown token.
        endpoint_process_stateless_reset(sock.clone(), &mut packet_stateless, true, true)?;

        // Server send stateless reset
        let packets = sock.packets.borrow();
        assert!(packets.len() > 0);

        let (packet, _) = &packets[0];
        let (hdr, _) = PacketHeader::from_bytes(&packet, 8)?;
        assert_eq!(hdr.pkt_type, PacketType::OneRTT);
        Ok(())
    }

    #[test]
    fn endpoint_stateless_reset_client_disabled() -> Result<()> {
        let mut packet_stateless = TEST_STATELESS_RESET.clone();
        let sock = Rc::new(MockSocket::new());

        // Client recv a stateless packet with unknown token.
        endpoint_process_stateless_reset(sock.clone(), &mut packet_stateless, false, false)?;

        // Do nothing with stateless reset disabled.
        let packets = sock.packets.borrow();
        assert!(packets.len() == 0);
        Ok(())
    }

    #[test]
    fn endpoint_stateless_reset_server_disabled() -> Result<()> {
        let mut packet_stateless = TEST_STATELESS_RESET.clone();
        let sock = Rc::new(MockSocket::new());

        // Server recv a stateless packet with unknown token.
        endpoint_process_stateless_reset(sock.clone(), &mut packet_stateless, true, false)?;

        // Do nothing with stateless reset disabled.
        let packets = sock.packets.borrow();
        assert!(packets.len() == 0);
        Ok(())
    }

    #[test]
    fn endpoint_conn_raw_pointer_stability() -> Result<()> {
        let cli_addr: SocketAddr = "127.8.8.8:8888".parse().unwrap();
        let srv_addr: SocketAddr = "127.8.8.8:8443".parse().unwrap();
        let host = Some("example.org");
        let mut e = Endpoint::new(
            Box::new(TestPair::new_test_config(false)?),
            false,
            Box::new(ClientHandler::new(
                CaseConf::default(),
                Arc::new(AtomicBool::new(false)),
            )),
            Rc::new(MockSocket::new()),
        );

        // Insert connection 0
        assert!(e.connect(cli_addr, srv_addr, host, None, None).is_ok());
        assert_eq!(e.conns.len(), 1);
        let raw_ptr = e.conn_get_mut(0).unwrap() as *const Connection;

        // Insert more connections and cause reallocation
        let capacity = e.conns.conns.capacity();
        for i in 0..capacity {
            assert!(e.connect(cli_addr, srv_addr, host, None, None).is_ok());
        }
        assert_ne!(e.conns.conns.capacity(), capacity);

        // Check raw pointer of connection 0
        let raw_ptr2 = e.conn_get_mut(0).unwrap() as *const Connection;
        assert_eq!(raw_ptr, raw_ptr2);

        Ok(())
    }

    #[test]
    fn transfer_single_stream_1rtt() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.request_num = 1;
        case_conf.request_size = 1024 * 16;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn transfer_single_stream_0rtt_and_1rtt() -> Result<()> {
        let mut t = TestPair::new();

        let cli_conf = TestPair::new_test_config(false)?;
        let srv_conf = TestPair::new_test_config(true)?;

        let mut case_conf = CaseConf::default();
        case_conf.session = Some(TestPair::new_test_session_state());
        case_conf.client_0rtt_expected = true;
        case_conf.resumption_expected = true;
        case_conf.request_num = 1;
        case_conf.request_size = 1024 * 16;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn transfer_single_stream_0rtt_reject() -> Result<()> {
        let mut t = TestPair::new();

        let cli_conf = TestTool::new_test_config(false)?;
        let mut srv_conf = TestTool::new_test_config(true)?;
        let mut tls_config = TlsConfig::new_server_config(
            "src/tls/testdata/cert.crt",
            "src/tls/testdata/cert.key",
            vec![b"h3".to_vec()],
            true,
        )?;
        tls_config.set_ticket_key(&vec![0x01; 48])?;
        srv_conf.set_tls_config_selector(Arc::new(tls_config));

        let mut case_conf = CaseConf::default();
        case_conf.session = Some(TestPair::new_test_session_state());
        case_conf.client_0rtt_expected = true;
        case_conf.resumption_expected = false;
        case_conf.request_num = 1;
        case_conf.request_size = 1024 * 16;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }

    #[test]
    fn transfer_single_stream_with_packet_loss() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.request_num = 1;
        case_conf.request_size = 1024 * 16;
        case_conf.packet_loss = 1;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn transfer_single_stream_with_packet_delay() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.request_num = 1;
        case_conf.request_size = 1024 * 16;
        case_conf.packet_delay = 20;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn transfer_single_stream_with_packet_reorder() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.request_num = 1;
        case_conf.request_size = 1024 * 16;
        case_conf.packet_reorder = 2;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn transfer_single_stream_with_packet_corruption() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.request_num = 1;
        case_conf.request_size = 1024 * 16;
        case_conf.packet_corruption = 1;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn transfer_single_stream_with_packet_duplication() -> Result<()> {
        let mut t = TestPair::new();

        let mut case_conf = CaseConf::default();
        case_conf.request_num = 1;
        case_conf.request_size = 1024 * 16;
        case_conf.packet_duplication = 2;

        t.run_with_test_config(case_conf)?;
        Ok(())
    }

    #[test]
    fn transfer_multi_stream_normal() -> Result<()> {
        let mut t = TestPair::new();

        let cli_conf = TestPair::new_test_config(false)?;
        let srv_conf = TestPair::new_test_config(true)?;

        let mut case_conf = CaseConf::default();
        case_conf.request_num = 8;
        case_conf.request_size = 1024 * 8;

        t.run(cli_conf, srv_conf, case_conf)?;
        Ok(())
    }
}
